package main

import (
	"fmt"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"sync"
	"time"
)

func main() {
	var wg = &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		initAsyncProduct("test")
	}()
	wg.Add(1)
	go func() {
		defer wg.Done()
		initAsyncProduct("weg2_log")
	}()
	wg.Wait()
}

func initAsyncProduct(topic string) {
	//config := sarama.NewConfig()
	//config.Producer.

	client, err := sarama.NewAsyncProducer([]string{"192.168.186.130:9092", "192.168.186.201:9092", "192.168.186.202:9092"}, nil)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	content := "Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state.Cluster Support multiple Zookeeper under the Kafka cluster display, select different Zookeeper to monitor their corresponding Kafka cluster state."
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.StringEncoder(content)

	chans := client.Input()
	for {
		//go func(input chan<- *sarama.ProducerMessage, msg *sarama.ProducerMessage) {
		chans <- msg
		time.Sleep(time.Second)
		//}(chans, msg)
	}
	//fmt.Println("xxx")
	// wait response
	//select {
	////case msg := <-producer.Successes():
	////    log.Printf("Produced message successes: [%s]\n",msg.Value)
	//case err := <-client.Errors():
	//	log.Println("Produced message failure: ", err)
	//default:
	//	log.Println("Produced message default")
	//}
}

func initSyncProduct() {
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.186.130:9092", "192.168.186.201:9092", "192.168.186.202:9092"}, nil)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}

	defer client.Close()
	//例子一发单个消息
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "test"
	content := "this is a test log"
	send01(client, msg, content)

	//例子二发多个消息
	for {

		for _, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
			send01(client, msg, word)
		}
	}
}

//发消息
func send01(client sarama.SyncProducer, msg *sarama.ProducerMessage, content string) {
	msg.Value = sarama.StringEncoder(content)

	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)

}
